Add extension support to the pipeline engine#2113
Conversation
Codecov Report✅ All modified and coverable lines are covered by tests.
Additional details and impacted files@@ Coverage Diff @@
## main #2113 +/- ##
===========================================
- Coverage 87.27% 81.97% -5.31%
===========================================
Files 553 181 -372
Lines 181329 51898 -129431
===========================================
- Hits 158252 42542 -115710
+ Misses 22543 8822 -13721
Partials 534 534
🚀 New features to boost your workflow:
|
Receivers and exporters both receive |
| /// Provides a minimal set of capabilities — primarily node identity and logging. | ||
| /// Extensions that need periodic timers should use `tokio::time::interval` directly. | ||
| #[derive(Clone)] | ||
| pub struct EffectHandler { |
There was a problem hiding this comment.
In local mode, extensions and pipeline nodes share a single LocalSet thread, so anything that blocks between .await points - sync I/O, heavy crypto, thread::sleep - will stall the whole pipeline silently. Probably worth documenting the non-blocking requirement on the Extension trait so implementors know upfront.
Also noticed EffectHandler doesn't have a spawn_blocking helper. Authors who need to run blocking work will either reach for tokio::task::spawn_blocking directly (works, but not discoverable) or block the thread without realising. Something like:
pub async fn spawn_blocking<F, R>(&self, f: F) -> R
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
tokio::task::spawn_blocking(f)
.await
.expect("blocking task panicked")
}would make the safe path obvious. One thing to note - !Send fields can't cross into the closure, so callers need to extract/clone before passing in, might be worth a doc note on the method.
There was a problem hiding this comment.
This issue applies equally to all node types (receivers, processors, exporters). They all share the same single-threaded runtime. None of them currently provide a spawn_blocking helper. I think documenting the non-blocking contract and potentially adding a spawn_blocking helper would be better as a follow-up that covers all node types uniformly, not just extensions.
| @@ -0,0 +1,126 @@ | |||
| // Copyright The OpenTelemetry Authors | |||
There was a problem hiding this comment.
do we need local/shared versions of extensions? both here and in the extensionwrapper? it seems like we already use arc for cloning and sync support anyway, and extensions are send only. so maybe we can just make it so that extensions don't have this separation?
There was a problem hiding this comment.
The extension's service handles are Arc-based and Send + Sync, but the extension implementation itself can hold !Send internal state. This mirrors the pattern used by receivers, processors, and exporters. They all have local/shared variants. Since the engine runs on current_thread + LocalSet, !Send is the natural default. Removing the local variant would force extension authors to add unnecessary Send boilerplate for state that never leaves the thread. I'd prefer to keep it for consistency and flexibility.
| /// | ||
| /// Returns an [`AuthError`] if credentials are unavailable | ||
| /// (e.g., token not yet refreshed, provider unreachable). | ||
| fn get_request_metadata(&self) |
There was a problem hiding this comment.
does client here mean clients that use http headers? I think something that is more agnostic and focuses more on atomic functionality could be more widely useful. something like I did in my pr -> BearerTokenProvider or sth like that, that returns bearer token. How the consumer uses it is none of our concern. This is also very beneficial if consumer wants to have access to stuff like expiration date of bearer token etc easily.
@lalitb @utpilla, I second this. In my view, all node types should be able to access extensions. However, before we can get there, we first need to introduce an |
|
@utpilla First feedback, given that I haven't read the entire PR. I like the idea of reusing the Before continuing the review, I'd really like to see a concrete example of an extension configuration (in our YAML files) and how it hooks up to, for example, a receiver. |
| /// Implement this trait in an auth extension to provide client-side | ||
| /// authentication. The extension decides what headers to attach | ||
| /// (e.g., `Authorization: Bearer <token>`, custom API key headers). | ||
| pub trait ClientAuthenticator: Send { |
There was a problem hiding this comment.
can async traits (funcs) be supported in this pattern?
|
Please don't merge this PR without my approval. I have been working on an extension system as well and I have a different opinion on how I think extensions should be implemented. I think the core idea is very similar in many cases, but I would like us to work together on this feature @utpilla. |
Based on utpilla's insight in open-telemetry#2113 that extensions never touch pipeline data.
@lquerel You could check this diff to get an idea of how a sample config could look like on both receiver and exporter end: utpilla#3 |
# Change Summary This PR adds a design proposal describing the extension system for the **OTel Dataflow Engine**. The document introduces a capability-based extension architecture allowing receivers, processors, and exporters to access non-pdata functionality through well-defined capability interfaces maintained in the engine core. The proposal covers: * core concepts such as **capabilities**, **extension providers**, and **extension instances** * integration of extensions into the **existing configuration model** * the **user experience** for declaring extensions and binding capabilities * the **developer experience** for implementing extension providers * the **runtime architecture** for resolving and instantiating extensions * the **execution models** supported by extensions (local vs shared) * comparison with the **Go Collector extension model** * a **phased evolution plan** (native extensions → hierarchical placement → WASM extensions) * implementation recommendations for building **high-performance extensions aligned with the engine's thread-per-core design** The goal of this document is to provide maintainers with a clear architectural proposal to review before implementing the extension system. ## What issue does this PR close? * Related to #2267, #2230, #2141, #2113 ## How are these changes tested? This PR introduces **documentation only** and does not modify runtime code. ## Are there any user-facing changes? Yes. This proposal describes a **future extension system** that will introduce new configuration capabilities such as: * an `extensions` section in pipeline configurations * a `capabilities` section in node definitions These changes are not implemented yet but outline the intended user-facing configuration model for extensions. --------- Co-authored-by: Joshua MacDonald <jmacd@users.noreply.github.com>
Change Summary
Introduces first-class extension support into the dataflow pipeline engine. Extensions are non-pipeline components that provide cross-cutting capabilities (e.g., authentication, health checks, service discovery) to receivers and exporters without participating in the
pdataflow.Motivation
Pipeline components like receivers and exporters often need shared services — credential management, token refresh, header validation — that don't fit the receiver → processor → exporter data-flow model. Extensions provide a clean separation: an independent task produces service handles, and pipeline components consume them at startup via a type-safe registry.
What's included
Engine core
ExtensionWrapper— Unified wrapper supporting bothSendand!Sendextension implementations, analogous to ReceiverWrapper/ExporterWrapper.local::Extension/shared::Extensiontraits — Lifecycle trait with astart()method that receives a control channel and effect handler.ExtensionConfig— Runtime configuration (control channel capacity) for extensions. Extensions only receive control messages, no pdata channels.ExtensionControlMsg— PData-free control message enum (Shutdown,TimerTick,CollectTelemetry).ExtensionFactory— Factory struct (not generic over PData) registered via#[distributed_slice].ExtensionHandles/ExtensionRegistryBuilder/ExtensionRegistry— Type-safe,Clone + Sendregistry. Extension factories register typed handles; pipeline components retrieve them by(extension_name, TypeId)at startup.ServerAuthenticator/ClientAuthenticatortraits — Pluggable auth contract for receivers (validate incoming requests) and exporters (attach outgoing credentials), with cloneable handle wrappers (ServerAuthenticatorHandle,ClientAuthenticatorHandle).Engine macros (engine-macros)
#[pipeline_factory]macro now generates anEXTENSION_FACTORIESdistributed slice and aget_<prefix>_extension_factory_map()helper.Config (config)
NodeKind::Extensionrecognized in URN parsing (:extensionsuffix).Pipeline components (
otap,contrib-nodes,validation,benchmarks)start()signatures now acceptExtensionRegistryas a third parameter._extension_registry(unused) — no behavioral changes.What's NOT included
No concrete extension implementations are shipped yet (no entries in OTAP_EXTENSION_FACTORIES).
What issue does this PR close?
How are these changes tested?
Are there any user-facing changes?
Yes